Skip to content

Conversation

@nabinchha
Copy link
Contributor

@nabinchha nabinchha commented Nov 4, 2025

  • Google doc with change proposal.
  • Add a mechanism to provide selection strategy to SeedConfig (works for either of the sampling strategy)
    • IndexRange allows you to directly specify the start/end (inclusive) indices in the seed dataset that should be used for sampling.
    • PartitionBlock allows you to specify the above but via a block address (index). By telling data designer how many blocks there are and what the index of the block is.
    • Update/Added unit tests.

Example notebook with workflow

import os
from data_designer.essentials import (
    SamplerColumnConfig,
    CategorySamplerParams,
    DataDesigner,
    DataDesignerConfigBuilder,
    InferenceParameters,
    LLMTextColumnConfig,
    ModelConfig,
    SamplerType,
    SamplingStrategy,
    IndexRange,
    PartitionBlock,
)

# Make sure to have NVIDIA_API_KEY with your build.nvidia.com api key
os.environ["NVIDIA_API_KEY"] = os.environ["NVBUILD_API_KEY"]

# Design you data designer workflow
config_builder = DataDesignerConfigBuilder(
    model_configs=[
        ModelConfig(
            alias="nano-v2",
            model="nvidia/nvidia-nemotron-nano-9b-v2",
            inference_parameters=InferenceParameters(
                max_tokens=2048,
                temperature=0.50,
            ),
        )
    ]
)

config_builder.add_column(
    LLMTextColumnConfig(
        name="greetings_completion",
        model_alias="nano-v2",
        prompt="""
        {{name}} speaks {{language}} language. Someone said "{{greetings}}" to {{name}}. Write a casual and formal response greeting in '{{language}}' language.
        """,
    )
)

# Run you data designer worklow for a preview
data_designer = DataDesigner(artifact_path="my_dataset")

# Add seeds (this parquet has three coluns, name, language, greetings
seed_reference =  data_designer.make_seed_reference_from_file("example_seeds.parquet")
# config_builder.with_seed_dataset(seed_reference, sampling_strategy=SamplingStrategy.SHUFFLE, 
# selection_strategy=IndexRange(start=0, end=1))

config_builder.with_seed_dataset(seed_reference, sampling_strategy=SamplingStrategy.SHUFFLE, selection_strategy=PartitionBlock(partition_index=1, num_partitions=3))

preview_results = data_designer.preview(config_builder=config_builder)

Example preview output

[17:10:06] [INFO] 🕵️ Preview generation in progress
[17:10:06] [INFO] ✅ Validation passed
[17:10:06] [INFO] ⛓️ Sorting column configs into a Directed Acyclic Graph
[17:10:06] [INFO] 🩺 Running health checks for models...
[17:10:06] [INFO]   |-- 👀 Checking 'nvidia/nvidia-nemotron-nano-9b-v2'...
[17:10:07] [INFO]   |-- ✅ Passed!
[17:10:07] [INFO] 🌱 Sampling 10 records from seed dataset
[17:10:07] [INFO]   |-- seed dataset size: 11 records
[17:10:07] [INFO]   |-- sampling strategy: shuffle
[17:10:07] [INFO]   |-- selection strategy: PartitionBlock
{
  "partition_index": 1,
  "num_partitions": 3
}
[17:10:07] [INFO]   |-- seed dataset size after selection: 3 records
[17:10:07] [INFO] 📝 Preparing llm-text column generation
[17:10:07] [INFO]   |-- column name: 'greetings_completion'
[17:10:07] [INFO]   |-- model config:
{
  "alias": "nano-v2",
  "model": "nvidia/nvidia-nemotron-nano-9b-v2",
  "inference_parameters": {
      "temperature": 0.5,
      "top_p": null,
      "max_tokens": 2048,
      "max_parallel_requests": 4,
      "timeout": null,
      "extra_body": null
  },
  "provider": null
}
[17:10:07] [INFO]   |-- default model provider: 'nvidia'
[17:10:07] [INFO] 🐙 Processing llm-text column 'greetings_completion' with 4 concurrent workers
[17:10:22] [INFO] 📊 Model usage summary:
{
  "nvidia/nvidia-nemotron-nano-9b-v2": {
      "token_usage": {
          "prompt_tokens": 393,
          "completion_tokens": 5339,
          "total_tokens": 5732
      },
      "request_usage": {
          "successful_requests": 10,
          "failed_requests": 0,
          "total_requests": 10
      },
      "tokens_per_second": 380,
      "requests_per_minute": 39
  }
}
[17:10:22] [INFO] 📐 Measuring dataset column statistics:
[17:10:22] [INFO]   |-- 📝 column: 'greetings_completion'
[17:10:22] [INFO]   |-- 🌱 column: 'language'
[17:10:22] [INFO]   |-- 🌱 column: 'greetings'
[17:10:22] [INFO]   |-- 🌱 column: 'name'
[17:10:22] [INFO] 🙌 Preview complete!

Using the same query against a HF repo

import duckdb

con = duckdb.connect()

# Load the httpfs extension to enable the hf:// protocol
con.execute("INSTALL httpfs; LOAD httpfs;")

query = f"""
    SELECT * EXCLUDE (row_num) FROM (
        SELECT *, row_number() OVER () as row_num
            FROM 'hf://datasets/fka/awesome-chatgpt-prompts/prompts.csv'
        )
    WHERE row_num > 10 AND row_num <= 25
    ORDER BY RANDOM()
"""

df = con.execute(query).fetchdf()
df.head(100)

Comment on lines 100 to 107
read_query = f"""
SELECT * EXCLUDE (row_num) FROM (
SELECT *, row_number() OVER () as row_num
FROM '{self._dataset_uri}'
) sub
WHERE row_num > {self._index_range.start} AND row_num <= {self._index_range.end + 1}
{shuffle_query}
"""
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When the sampling strategy is "shuffle", does this shuffle the entire dataset before indexing, or does it shuffle the indexed array?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it shuffles the indexed array because the where clause is most likely applied to filter first.

Copy link
Contributor

@johnnygreco johnnygreco Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is that what we want? Seems like "shuffle" should mean "shuffle the entire dataset" so that all records are equally likely to be present in a given indexed array. I'm not sure it actually matters too much, though, since all records are treated independently – assuming the entire seed dataset is used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://duckdb.org/docs/stable/sql/query_syntax/orderby

It's applied after the where clause filtering.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be weird to shuffle first and then give exact indices ....

Copy link
Contributor

@eric-tramel eric-tramel Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's think about the user's desired outcomes:

  • Shuffled, no repeat: "Just give me one entry per row, I don't care about anything else"
  • Shuffled, repeat: "I just want a bunch of random outcomes, I'm using my inputs as another sampler."
  • Ordered, no repeat: "I NEED a one-to-one, in order correspondence with this data."
  • Ordered, repeat: "I want multiple epochs/passes over this data, and the same number of them per sample."

How this relates to PartitionBlock:

  • Shuffled, no repeat: Split into partitions, randomize order within partition, no problem.
  • Shuffled, repeat: Depends on how exactly we want to preserve statistics. True stat would be shuffle and then split. If the partitions were exactly the same size, then it would be okay, but that is not true in general (esp. for the last Partition which might have some remainder size). However, there is a fix -- we can change the number of samples produced from each PartitionBlock according to the proportion of the size of the Block with respect to the full dataset. That should converge us to the same distribution of the original shuffle.
  • Ordered, no repeat: Split into partitions, create a record for each row, no problem.
  • Ordered, repeat: Split into partitions, create N records for each row, no problem.

I think that doing the proportional sampling per PartitionBlock post-partitioning would be desirable since it parallelizes easily vs needing to do an op on the full dataset.

Copy link
Contributor Author

@nabinchha nabinchha Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The exact partition that is chosen is hidden from the user.

We resolve to indices automatically for the user, but it's in order based on how they define partitions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For posterity, we chatted on Slack about this. To solve for a weird distribution getting stuck in the the last partition ( the case of shuffle), we'd need to pre-shuffle the whole dataset. It is an expensive operation.

I ran a test to see how long it would take for duckdb to shuffle a really large dataset. As an example, we took the web step dataset that's about 164 GB.

TL;DR
duckdb took about a half hour to shuffle the entire dataset and write to 1000 partitions in a different destination:

srun Execution

srun --account=llmservice_sdg_research --partition=cpu --nodes=1 --ntasks=1 --cpus-per-task=8 --mem=128G --time=2:00:00 bash -c "source /lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/.venv/bin/activate && python shuffle.py"

Output

Starting shuffle and write...
Configuring DuckDB settings...
Reading and shuffling data...
Source: /lustre/fsw/portfolios/llmservice/users/etramel/datasets/stem_pretrain/dataprep/v2_16k_datamix/*.parquet
Destination: /lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/tmp_data
Initial partitioning complete.
Shuffle and write completed in 1733.94 seconds
Output written to: /lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/tmp_data

shuffle.py

import duckdb
import time

start = time.perf_counter()
print("Starting shuffle and write...", flush=True)

# Connect to DuckDB
conn = duckdb.connect()

# Configure DuckDB for memory-efficient operation
# Conservative memory settings to avoid OOM
print("Configuring DuckDB settings...", flush=True)
conn.execute("SET memory_limit='110GB'")  # Leave headroom for system
conn.execute("SET threads=4")  # Reduce threads to save memory
conn.execute("SET temp_directory='/tmp/duckdb_temp'")
conn.execute("SET preserve_insertion_order=false")
# Enable disk-based operations for large datasets
conn.execute("SET max_memory='110GB'")

# Source and destination paths
data_path = "/lustre/fsw/portfolios/llmservice/users/etramel/datasets/stem_pretrain/dataprep/v2_16k_datamix/*.parquet"
output_path = "/lustre/fsw/portfolios/llmservice/users/nmulepati/test_shuffle/tmp_data"

print("Reading and shuffling data...", flush=True)
print(f"Source: {data_path}", flush=True)
print(f"Destination: {output_path}", flush=True)

# Memory-efficient shuffle: just add random partition key, no ORDER BY
# The random partitioning itself provides the shuffle effect
query = f"""
    COPY (
        SELECT *, (random() * 1000)::INTEGER as _partition_key
        FROM '{data_path}'
    ) TO '{output_path}' (
        FORMAT PARQUET,
        PARTITION_BY (_partition_key),
        OVERWRITE_OR_IGNORE true
    )
"""

conn.execute(query)
print("Initial partitioning complete.", flush=True)

elapsed = time.perf_counter() - start
print(f"Shuffle and write completed in {elapsed:.2f} seconds")
print(f"Output written to: {output_path}")

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok I've optimized the select query to be based on limit and offset followed by order by. The previous implementation was still doing a full table scan. 483363d

Example run to pull a 1000 item slice from the 164GB partitioned dataset took <1s:

Starting select...
Start: 435971.227842381
End: 10999
Limit: 1000
Offset: 10000
Query:

SELECT * from (
    SELECT * from '/lustre/fsw/portfolios/llmservice/users/etramel/datasets/stem_pretrain/dataprep/v2_16k_datamix/*.parquet' LIMIT 1000 OFFSET 10000
) ORDER BY RANDOM()

Executing query...
Fetched 1000 rows
Select completed in 0.88 seconds

cc @eric-tramel @johnnygreco

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

woohoo

@nabinchha nabinchha requested a review from johnnygreco November 4, 2025 19:59
@nabinchha
Copy link
Contributor Author

Added new docstring to SeedConfig in 9765643

johnnygreco
johnnygreco previously approved these changes Nov 4, 2025
Copy link
Contributor

@johnnygreco johnnygreco left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice @nabinchha! Good to go right after we merge #11 (hopefully this doesn't cause any strange conflicts for you)

@nabinchha
Copy link
Contributor Author

nabinchha commented Nov 4, 2025

Nice @nabinchha! Good to go right after we merge #11 (hopefully this doesn't cause any strange conflicts for you)

minor conflicts! I'll mergeafter I have an approval

@nabinchha nabinchha requested a review from johnnygreco November 4, 2025 23:29
@nabinchha nabinchha merged commit 7268290 into main Nov 4, 2025
10 checks passed
@nabinchha nabinchha deleted the nm/seed-config-partition-strategy branch November 4, 2025 23:36
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants